🖥️☁️AWS S3 Integration with PySpark: Data Cleaning and Storage

 

AWS S3 Integration with PySpark: Data Cleaning and Storage

Introduction

AWS Simple Storage Service (S3) is a widely used cloud storage solution that integrates seamlessly with Apache Spark. In this blog, we'll demonstrate how to connect PySpark with AWS S3, clean data using regex, and store processed data in an RDS database.


1. Setting Up AWS Credentials

Before accessing AWS S3, you need to authenticate using your Access Key and Secret Key. Ensure that sensitive credentials are securely stored and not hardcoded.

import urllib

ACCESS_KEY = "your-access-key"
SECRET_KEY = "your-secret-key"
ENCODED_SECRET_KEY = urllib.parse.quote(SECRET_KEY, "")
AWS_BUCKET_NAME = "your-bucket-name"
MOUNT_NAME = "s3data"

2. Mounting AWS S3 to Databricks File System (DBFS)

Mounting S3 to Databricks allows seamless access to stored files.

dbutils.fs.mount(
    "s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME),
    "/mnt/%s" % MOUNT_NAME
)

display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))

Output:

Path
/mnt/s3data/sample-data.csv
/mnt/s3data/logs/

3. Loading Data from S3

Once mounted, we can load CSV files into a PySpark DataFrame.

data_path = "/mnt/s3data/sample-data.csv"
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(data_path)
df.show()

Output:

+-----+------------+----------------+
| ID  | Name      | Email          |
+-----+------------+----------------+
| 101 | John Doe  | john@example.com |
| 102 | Jane Doe  | jane@example.com |
+-----+------------+----------------+

4. Data Cleaning Using Regular Expressions

4.1 Removing Special Characters from Column Names

Column names may contain unwanted special characters that need to be cleaned.

import re
from pyspark.sql.functions import *

cols = [re.sub("[^a-zA-Z0-9]", "", c) for c in df.columns]
df = df.toDF(*cols)

4.2 Cleaning Email Data

Let's remove @ and . from email addresses.

df = df.withColumn("Email", regexp_replace(col("Email"), "[@.]", ""))
df.show()

Output:

+-----+------------+--------------+
| ID  | Name      | Email        |
+-----+------------+--------------+
| 101 | John Doe  | johnexamplecom |
| 102 | Jane Doe  | janeexamplecom |
+-----+------------+--------------+

5. Storing Cleaned Data in RDS Database

5.1 Configuring Database Connection

host = "jdbc:mysql://your-rds-instance.amazonaws.com:3306/yourdb"
username = "admin"
password = "yourpassword"
dbtable = "cleaned_data"

5.2 Writing Data to MySQL

df.write.mode("append").format("jdbc").option("url", host)\
    .option("user", username)\
    .option("password", password)\
    .option("dbtable", dbtable)\
    .save()

Output:

Data successfully written to RDS table: cleaned_data

6. Batch Processing Multiple Files from S3

We can loop through files in S3, clean them, and store them in an RDS database.

data_folder = "/mnt/s3data/csvdata"
files = dbutils.fs.ls(data_folder)
file_paths = [f.path for f in files if "us-500" in f.path and f.path.endswith(".csv")]

for p in file_paths:
    filename = os.path.splitext(os.path.basename(p))[0]
    tablename = re.sub(r'[^a-zA-Z0-9]', '', filename)
    print(f"Importing {p} into table {tablename}")
    
    df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(p)
    df.write.mode("overwrite").format("jdbc").option("url", host)\
        .option("user", username)\
        .option("password", password)\
        .option("dbtable", tablename)\
        .save()

Output:

Importing /mnt/s3data/csvdata/us-500.csv into table us500
Data successfully written to RDS table: us500

Conclusion

In this blog, we explored how to:

  • Mount an AWS S3 bucket to Databricks.
  • Read data from S3 into PySpark.
  • Clean data using regex operations.
  • Store processed data into an RDS database.

This approach helps in efficient data processing and ensures that cleaned data is stored securely for further analysis.

Comments

Popular posts from this blog

🌐Filtering and Copying Files Dynamically in Azure Data Factory (ADF)

🔥Apache Spark Architecture with RDD & DAG

🖥️☁️AWS Athena, AWS Lambda, AWS Glue, and Amazon S3 – Detailed Explanation